<configuration> <!-- 开启RM高可用 --> <property> <name>yarn.resourcemanager.ha.enabled</name> <value>true</value> </property> <!-- 指定RM的cluster id --> <property> <name>yarn.resourcemanager.cluster-id</name> <value>rmcluster</value> </property> <!-- 指定RM的名字 --> <property> <name>yarn.resourcemanager.ha.rm-ids</name> <value>rm1,rm2</value> </property> <!-- 分别指定RM的地址 --> <property> <name>yarn.resourcemanager.hostname.rm1</name> <value>flink02</value> </property> <property> <name>yarn.resourcemanager.hostname.rm2</name> <value>flink03</value> </property> <!--指定zookeeper集群的地址--> <property> <name>yarn.resourcemanager.zk-address</name> <value>flink01:2181,flink02:2181,flink03:2181</value> </property> <!--启用自动恢复--> <property> <name>yarn.resourcemanager.recovery.enabled</name> <value>true</value> </property> <!--指定resourcemanager的状态信息存储在zookeeper集群--> <property> <name>yarn.resourcemanager.store.class</name> <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value> </property> <!-- 设置yarn中的服务类 --> <property> <description>A comma separated list of services where service name should only contain a-zA-Z0-9_ and can not start with numbers</description> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <!-- AM重启最大尝试次数 --> <property> <description>The maximum number of application attempts. It's a global setting for all application masters. Each application master can specify its individual maximum number of application attempts via the API, but the individual number cannot be more than the global upper bound. If it is, the resourcemanager will override it. The default number is set to 2, to allow at least one retry for AM.</description> <name>yarn.resourcemanager.am.max-attempts</name> <value>100</value> </property> <!-- 开启物理内存限制 --> <property> <description>Whether physical memory limits will be enforced for containers.</description> <name>yarn.nodemanager.pmem-check-enabled</name> <value>true</value> </property> <!-- 关闭虚拟内存限制 --> <property> <description>Whether virtual memory limits will be enforced for containers.</description> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property> <!-- 虚拟内存和物理内存比例 --> <property> <description>Ratio between virtual memory to physical memory when setting memory limits for containers. Container allocations are expressed in terms of physical memory, and virtual memory usage is allowed to exceed this allocation by this ratio. </description> <name>yarn.nodemanager.vmem-pmem-ratio</name> <value>5</value> </property> <!-- 每个Container请求的最小内存 --> <property> <description>The minimum allocation for every container request at the RM, in MBs. Memory requests lower than this will throw a InvalidResourceRequestException.</description> <name>yarn.scheduler.minimum-allocation-mb</name> <value>1024</value> </property> <!-- 每个Container请求的最大内存 --> <property> <description>The maximum allocation for every container request at the RM, in MBs. Memory requests higher than this will throw a InvalidResourceRequestException.</description> <name>yarn.scheduler.maximum-allocation-mb</name> <value>6144</value> </property> <!-- 每个Container请求的最小virtual CPU cores --> <property> <description>The minimum allocation for every container request at the RM, in terms of virtual CPU cores. Requests lower than this will throw a InvalidResourceRequestException.</description> <name>yarn.scheduler.minimum-allocation-vcores</name> <value>1</value> </property> <!-- 每个Container请求的最大virtual CPU cores --> <property> <description>The maximum allocation for every container request at the RM, in terms of virtual CPU cores. Requests higher than this will throw a InvalidResourceRequestException.</description> <name>yarn.scheduler.maximum-allocation-vcores</name> <value>8</value> </property> <!-- 限制 NodeManager 能够使用的最大物理内存 --> <property> <description>Flag to determine if logical processors(such as hyperthreads) should be counted as cores. Only applicable on Linux when yarn.nodemanager.resource.cpu-vcores is set to -1 and yarn.nodemanager.resource.detect-hardware-capabilities is true. </description> <name>yarn.nodemanager.resource.memory-mb</name> <value>6144</value> </property> <!-- 限制 NodeManager 能够使用的最大virtual CPU cores --> <property> <description>Number of vcores that can be allocated for containers. This is used by the RM scheduler when allocating resources for containers. This is not used to limit the number of CPUs used by YARN containers. If it is set to -1 and yarn.nodemanager.resource.detect-hardware-capabilities is true, it is automatically determined from the hardware in case of Windows and Linux. In other cases, number of vcores is 8 by default.</description> <name>yarn.nodemanager.resource.cpu-vcores</name> <value>8</value> </property> <!-- 启用日志聚集功能 --> <property> <description>Whether to enable log aggregation. Log aggregation collects each container's logs and moves these logs onto a file-system, for e.g. HDFS, after the application completes. Users can configure the "yarn.nodemanager.remote-app-log-dir" and "yarn.nodemanager.remote-app-log-dir-suffix" properties to determine where these logs are moved to. Users can access the logs via the Application Timeline Server.</description> <name>yarn.log-aggregation-enable</name> <value>true</value> </property> <!-- 设置HDFS上日志的保存时间,默认设置为7天--> <property> <description>Time in seconds to retain user logs. Only applicable if log aggregation is disabled</description> <name>yarn.nodemanager.log.retain-seconds</name> <value>10800</value> </property> </configuration>
# The config parameter defining the network address to connect to for communication with the job manager. This value is only interpreted in setups where a single JobManager with static name or address exists (simple standalone setups, or container setups with dynamic service name resolution). It is not used in many high-availability setups, when a leader-election service (like ZooKeeper) is used to elect and discover the JobManager leader from potentially multiple standby JobManagers. jobmanager.rpc.address: flink01 # JVM heap size for the JobManager. jobmanager.heap.size: 1024m # JVM heap size for the TaskManagers, which are the parallel workers of the system. On YARN setups, this value is automatically configured to the size of the TaskManager's YARN container, minus a certain tolerance value. taskmanager.heap.size: 2048m # The number of parallel operator or user function instances that a single TaskManager can run. If this value is larger than 1, a single TaskManager takes multiple instances of a function or operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the available memory is divided between the different operator or function instances. This value is typically proportional to the number of physical CPU cores that the TaskManager's machine has (e.g., equal to the number of cores, or half the number of cores). taskmanager.numberOfTaskSlots: 4 # Default parallelism forjobs. parallelism.default: 2
# Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER" or specify FQN of factory class. # high-availability mode (required): The high-availability mode has to be setin conf/flink-conf.yaml to zookeeper in order to enable high availability mode. Alternatively this option can be set to FQN of factory class Flink should use to create HighAvailabilityServices instance. high-availability: zookeeper # File system path (URI) where Flink persists metadata in high-availability setups. # Storage directory (required): JobManager metadata is persisted in the file system storageDir and only a pointer to this state is stored in ZooKeeper. # The storageDir stores all metadata needed to recover a JobManager failure. high-availability.storageDir: hdfs://ns1/flink/recovery # The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper. # ZooKeeper quorum (required): A ZooKeeper quorum is a replicated group of ZooKeeper servers, which provide the distributed coordination service. high-availability.zookeeper.quorum: flink01:2181,flink02:2181,flink03:2181 # The root path under which Flink stores its entries in ZooKeeper. # ZooKeeper root (recommended): The root ZooKeeper node, under which all cluster nodes are placed. high-availability.zookeeper.path.root: /flink # yarn.application-attempts: The number of ApplicationMaster (+ its TaskManager containers) attempts. If this value is set to 1 (default), the entire YARN session will fail when the Application master fails. Higher values specify the number of restarts of the ApplicationMaster by YARN. yarn.application-attempts: 100
# The state backend to be used to store and checkpoint state. state.backend: rocksdb # The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers). state.checkpoints.dir: hdfs://ns1/flink/flink-checkpoints # The default directory for savepoints. Used by the state backends that write savepoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). state.savepoints.dir: hdfs://ns1/flink/save-checkpoints # Option whether the state backend should create incremental checkpoints, if possible. For an incremental checkpoint, only a diff from the previous checkpoint is stored, rather than the complete checkpoint state. Some state backends may not support incremental checkpoints and ignore this option. state.backend.incremental: true
# Directories for temporary files, separated by",", "|", or the system's java.io.File.pathSeparator. io.tmp.dirs: /data/apps/flinkapp/tmp
切记:Flink On Yarn HA一定不要手动配置high-availability.cluster-id
1 2 3 4 5
# The ID of the Flink cluster, used to separate multiple Flink clusters from each other. Needs to be setfor standalone clusters but is automatically inferred in YARN and Mesos. # ZooKeeper cluster-id (recommended): The cluster-id ZooKeeper node, under which all required coordination data for a cluster is placed. # The ID of the Flink cluster, used to separate multiple Flink clusters from each other. Needs to be setfor standalone clusters but is automatically inferred in YARN and Mesos. # Important: You should not set this value manually when running a YARN cluster, a per-job YARN session, or on another cluster manager. In those cases a cluster-id is automatically being generated based on the application id. Manually setting a cluster-id overrides this behaviour in YARN. Specifying a cluster-id with the -z CLI option, in turn, overrides manual configuration. If you are running multiple Flink HA clusters on bare metal, you have to manually configure separate cluster-ids for each cluster. high-availability.cluster-id: /default
# 开始一个yarn-session(命名为FlinkTestCluster) # JobManager内存2048M # 每个TaskManager内存2048M且分配4个slot(The session cluster will automatically allocate additional containers which run the Task Managers when jobs are submitted to the cluster.) # 分离式模式启动 yarn-session.sh -jm 2048 -tm 2048 -s 4 -nm FlinkTestCluster -d